package cn.ac.iie.dpl.data.mmloader.mq;

import cn.ac.iie.dpl.data.mmloader.globalParas.GlobalParas;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import org.apache.log4j.Logger;

public class RocketProducer {

    public static Logger log = Logger.getLogger(RocketProducer.class.getName());
    public static DefaultMQProducer producer = null;

    public static void init(String producerGroupname, String namesrvAddr) {
        log.info("init rocket mq ok for " + producerGroupname);
        producer = getDefaultMQProducer(producerGroupname, namesrvAddr);
    }

    private static DefaultMQProducer getDefaultMQProducer(String producerGroupname, String namesvrAddr) {
        producer = new DefaultMQProducer(producerGroupname);
        producer.setNamesrvAddr(namesvrAddr);
        producer.setInstanceName(producerGroupname + GlobalParas.machineIP);
        producer.setClientIP(GlobalParas.machineIP);
        producer.setRetryTimesWhenSendFailed(3);
        producer.setSendMsgTimeout(5000);
        producer.setCompressMsgBodyOverHowmuch(32 * 1024);
        producer.setMaxMessageSize(4096*1024);
        producer.setClientCallbackExecutorThreads(50);
        producer.setRetryAnotherBrokerWhenNotStoreOK(true);
        try {
            producer.start();
        } catch (MQClientException ex) {
            log.error(ex, ex);
        }
        return producer;
    }

    public static void sendMessage(String topic, byte[] pData, long count) {
        long bg = System.currentTimeMillis();
        SendResult sendResult = null;
        Message msg = new Message(topic, pData);
        while ((sendResult == null) || (sendResult.getSendStatus() != SendStatus.SEND_OK)) {
            try {
                
                sendResult = producer.send(msg);
                if ((sendResult == null) || (sendResult.getSendStatus() == SendStatus.FLUSH_DISK_TIMEOUT)) {
                    if (sendResult == null) {
                        log.warn("send message fail,the send result is null,will sleep and retry ...");
                    } else {
                        log.warn("send message fail one time for write disk fail,will sleep and retry,the information is " + producer.getClientIP() + " " + producer.getProducerGroup());
                    }
                    try {
                        Thread.sleep(200L);
                    } catch (Exception e) {
                    }
                    continue;
                }
                log.info("send " + count + " records and size " + pData.length + " bytes,use " + (System.currentTimeMillis() - bg) + " ms for " + topic + ":" + sendResult);
            } catch (Exception ex) {
                log.error(ex + ",the information is:topic --> " + topic +",body size is " +pData.length+"，will retry,", ex);
                try {
                    Thread.sleep(200L);
                } catch (Exception e) {
                }
                continue;
            } finally {
            }
        }
    }

    public static void destroyPool() {
        producer.shutdown();
    }
}